package org.xukai.spark.streaming.scala

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * Chen Chao
 */
object HdfsWordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[2]").setAppName("HdfsWordCount")
    StreamingExamples.setStreamingLogLevels()

    //新建StreamingContext
    val ssc = new StreamingContext(conf, Seconds(3))

    //创建FileInputDStream，并指向特定目录
    val lines = ssc.textFileStream("hdfs://xukai1:9000/streaming/")
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

